package day05;

import beans.SensorReading;
import day03.window.FlinkWindow00;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Flink Table API 与 SQL —— 简单了解 TableAPI
 *
 * @author lvbingbing
 * @date 2022-01-13 14:02
 */
public class FlinkTableApi00 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、TableAPI 简单使用
        studyTableApi(flinkWindow);
        // 4、触发程序执行
        env.execute();
    }

    /**
     * TableAPI 简单使用
     *
     * @param flinkWindow <br>
     */
    private static void studyTableApi(FlinkWindow00 flinkWindow) {
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 1、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、基于流创建一张表
        SingleOutputStreamOperator<SensorReading> sensorReadingStream = flinkWindow.getSingleOutputStreamOperator();
        Table dataTable = tableEnv.fromDataStream(sensorReadingStream);
        // 3、调用 tableAPI 进行转换操作
        Table resultTable = dataTable.select("id, temperature")
                .where("id = 'sensor_1'");
        // 打印结果
        DataStream<Row> dataStream = tableEnv.toAppendStream(resultTable, Row.class);
        dataStream.print("tableApi");
        // 4、使用 Flink SQL
        tableEnv.createTemporaryView("sensor", sensorReadingStream);
        String sql = "select id, temperature from sensor where id = 'sensor_1'";
        Table sqlTable = tableEnv.sqlQuery(sql);
        // 打印结果
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(sqlTable, Row.class);
        rowDataStream.print("flinkSQL");
    }
}